added some development tools
[windows-sources.git] / developer / Samples / NET Standard / ParallelExtensionsExtras_Standard / TaskSchedulers / IOCompletionPortTaskScheduler.cs
blobf3aa6f9c02da83e9cc97c394e30aec14737405ca
1 //--------------------------------------------------------------------------
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // File: IOCompletionPortTaskScheduler.cs
6 //
7 //--------------------------------------------------------------------------
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.ComponentModel;
12 using System.Runtime.InteropServices;
13 using Microsoft.Win32.SafeHandles;
15 namespace System.Threading.Tasks.Schedulers
17 /// <summary>Provides a TaskScheduler that uses an I/O completion port for concurrency control.</summary>
18 public sealed class IOCompletionPortTaskScheduler : TaskScheduler, IDisposable
20 /// <summary>The queue of tasks to be scheduled.</summary>
21 private readonly ConcurrentQueue<Task> m_tasks;
22 /// <summary>The I/O completion port to use for concurrency control.</summary>
23 private readonly IOCompletionPort m_iocp;
24 /// <summary>Whether the current thread is a scheduler thread.</summary>
25 private ThreadLocal<bool> m_schedulerThread;
26 /// <summary>Event used to wait for all threads to shutdown.</summary>
27 private CountdownEvent m_remainingThreadsToShutdown;
29 /// <summary>Initializes the IOCompletionPortTaskScheduler.</summary>
30 /// <param name="maxConcurrencyLevel">The maximum number of threads in the scheduler to be executing concurrently.</param>
31 /// <param name="numAvailableThreads">The number of threads to have available in the scheduler for executing tasks.</param>
32 public IOCompletionPortTaskScheduler(int maxConcurrencyLevel, int numAvailableThreads)
34 // Validate arguments
35 if (maxConcurrencyLevel < 1) throw new ArgumentNullException("maxConcurrencyLevel");
36 if (numAvailableThreads < 1) throw new ArgumentNullException("numAvailableThreads");
38 m_tasks = new ConcurrentQueue<Task>();
39 m_iocp = new IOCompletionPort(maxConcurrencyLevel);
40 m_schedulerThread = new ThreadLocal<bool>();
41 m_remainingThreadsToShutdown = new CountdownEvent(numAvailableThreads);
43 // Create and start the threads
44 for (int i = 0; i < numAvailableThreads; i++)
46 new Thread(() =>
48 try
50 // Note that this is a scheduler thread. Used for inlining checks.
51 m_schedulerThread.Value = true;
53 // Continually wait on the I/O completion port until
54 // there's a work item, then process it.
55 while (m_iocp.WaitOne())
57 Task next;
58 if (m_tasks.TryDequeue(out next)) TryExecuteTask(next);
61 finally { m_remainingThreadsToShutdown.Signal(); }
62 }) { IsBackground = true }.Start();
66 /// <summary>Dispose of the scheduler.</summary>
67 public void Dispose()
69 // Close the I/O completion port. This will cause any threads blocked
70 // waiting for items to wake up.
71 m_iocp.Dispose();
73 // Wait for all threads to shutdown. This could cause deadlock
74 // if the current thread is calling Dispose or is part of such a cycle.
75 m_remainingThreadsToShutdown.Wait();
76 m_remainingThreadsToShutdown.Dispose();
78 // Clean up remaining state
79 m_schedulerThread.Dispose();
82 /// <summary>Gets a list of all tasks scheduled to this scheduler.</summary>
83 /// <returns>An enumerable of all scheduled tasks.</returns>
84 protected override IEnumerable<Task> GetScheduledTasks() { return m_tasks.ToArray(); }
86 /// <summary>Queues a task to this scheduler for execution.</summary>
87 /// <param name="task">The task to be executed.</param>
88 protected override void QueueTask(Task task)
90 // Store the task and let the I/O completion port know that more work has arrived.
91 m_tasks.Enqueue(task);
92 m_iocp.NotifyOne();
95 /// <summary>Try to execute a task on the current thread.</summary>
96 /// <param name="task">The task to execute.</param>
97 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued to this scheduler.</param>
98 /// <returns>Whether the task was executed.</returns>
99 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
101 // Only inline from scheduler threads. This is to ensure concurrency control
102 // is able to handle inlining as well.
103 return m_schedulerThread.Value && TryExecuteTask(task);
106 /// <summary>Provides a simple managed wrapper for an I/O completion port.</summary>
107 private sealed class IOCompletionPort : IDisposable
109 /// <summary>Infinite timeout value to use for GetQueuedCompletedStatus.</summary>
110 private UInt32 INFINITE_TIMEOUT = unchecked((UInt32)Timeout.Infinite);
111 /// <summary>An invalid file handle value.</summary>
112 private IntPtr INVALID_FILE_HANDLE = unchecked((IntPtr)(-1));
113 /// <summary>An invalid I/O completion port handle value.</summary>
114 private IntPtr INVALID_IOCP_HANDLE = IntPtr.Zero;
116 /// <summary>The I/O completion porth handle.</summary>
117 private SafeFileHandle m_handle;
119 /// <summary>Initializes the I/O completion port.</summary>
120 /// <param name="maxConcurrencyLevel">The maximum concurrency level allowed by the I/O completion port.</param>
121 public IOCompletionPort(Int32 maxConcurrencyLevel)
123 // Validate the argument and create the port.
124 if (maxConcurrencyLevel < 1) throw new ArgumentOutOfRangeException("maxConcurrencyLevel");
125 m_handle = CreateIoCompletionPort(INVALID_FILE_HANDLE, INVALID_IOCP_HANDLE, UIntPtr.Zero, (UInt32)maxConcurrencyLevel);
128 /// <summary>Clean up.</summary>
129 public void Dispose() { m_handle.Dispose(); }
131 /// <summary>Notify that I/O completion port that new work is available.</summary>
132 public void NotifyOne()
134 if (!PostQueuedCompletionStatus(m_handle, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero))
135 throw new Win32Exception();
138 /// <summary>Waits for an item on the I/O completion port.</summary>
139 /// <returns>true if an item was available; false if the completion port closed before an item could be retrieved.</returns>
140 public bool WaitOne()
142 // Wait for an item to be posted.
143 // DangerousGetHandle is used so that the safe handle can be closed even while blocked in the call to GetQueuedCompletionStatus.
144 UInt32 lpNumberOfBytes;
145 IntPtr lpCompletionKey, lpOverlapped;
146 if (!GetQueuedCompletionStatus(m_handle.DangerousGetHandle(), out lpNumberOfBytes, out lpCompletionKey, out lpOverlapped, INFINITE_TIMEOUT))
148 int errorCode = Marshal.GetLastWin32Error();
149 if (errorCode == 735 /*ERROR_ABANDONED_WAIT_0*/ || errorCode == 6 /*INVALID_HANDLE*/)
150 return false;
151 else
152 throw new Win32Exception(errorCode);
154 return true;
157 /// <summary>
158 /// Creates an input/output (I/O) completion port and associates it with a specified file handle,
159 /// or creates an I/O completion port that is not yet associated with a file handle, allowing association at a later time.
160 /// </summary>
161 /// <param name="fileHandle">An open file handle or INVALID_HANDLE_VALUE.</param>
162 /// <param name="existingCompletionPort">A handle to an existing I/O completion port or NULL.</param>
163 /// <param name="completionKey">The per-handle user-defined completion key that is included in every I/O completion packet for the specified file handle.</param>
164 /// <param name="numberOfConcurrentThreads">The maximum number of threads that the operating system can allow to concurrently process I/O completion packets for the I/O completion port.</param>
165 /// <returns>If the function succeeds, the return value is the handle to an I/O completion port. If the function fails, the return value is NULL.</returns>
166 [DllImport("kernel32.dll", SetLastError = true)]
167 private static extern SafeFileHandle CreateIoCompletionPort(
168 IntPtr fileHandle, IntPtr existingCompletionPort, UIntPtr completionKey, UInt32 numberOfConcurrentThreads);
170 /// <summary>Attempts to dequeue an I/O completion packet from the specified I/O completion port.</summary>
171 /// <param name="completionPort">A handle to the completion port.</param>
172 /// <param name="lpNumberOfBytes">A pointer to a variable that receives the number of bytes transferred during an I/O operation that has completed.</param>
173 /// <param name="lpCompletionKey">A pointer to a variable that receives the completion key value associated with the file handle whose I/O operation has completed.</param>
174 /// <param name="lpOverlapped">A pointer to a variable that receives the address of the OVERLAPPED structure that was specified when the completed I/O operation was started.</param>
175 /// <param name="dwMilliseconds">The number of milliseconds that the caller is willing to wait for a completion packet to appear at the completion port. </param>
176 /// <returns>Returns nonzero (TRUE) if successful or zero (FALSE) otherwise.</returns>
177 [DllImport("kernel32.dll", SetLastError = true)]
178 private static extern Boolean GetQueuedCompletionStatus(
179 IntPtr completionPort, out UInt32 lpNumberOfBytes, out IntPtr lpCompletionKey, out IntPtr lpOverlapped, UInt32 dwMilliseconds);
181 /// <summary>Posts an I/O completion packet to an I/O completion port.</summary>
182 /// <param name="completionPort">A handle to the completion port.</param>
183 /// <param name="dwNumberOfBytesTransferred">The value to be returned through the lpNumberOfBytesTransferred parameter of the GetQueuedCompletionStatus function.</param>
184 /// <param name="dwCompletionKey">The value to be returned through the lpCompletionKey parameter of the GetQueuedCompletionStatus function.</param>
185 /// <param name="lpOverlapped">The value to be returned through the lpOverlapped parameter of the GetQueuedCompletionStatus function.</param>
186 /// <returns>If the function succeeds, the return value is nonzero. If the function fails, the return value is zero.</returns>
187 [DllImport("kernel32.dll", SetLastError = true)]
188 private static extern Boolean PostQueuedCompletionStatus(
189 SafeFileHandle completionPort, IntPtr dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);